Skip to main content

golang 使用 consul 做服务发现

golang 使用 consul 做服务发现

当我们服务越来越多,如果服务配置了弹性伸缩,或者当服务不可用时,我们需要随时动态掌握可以使用的服务数量,并向可提供响应的服务发送请求。这时我们需要服务发现功能,当新增服务时,服务可以自动向consul注册,客户端直接向consul发送请求,获取可用服务的地址和端口;当服务不可用时,动态的更新consul,删除该服务在consul中的列表

docker安装consul

docker run --name consul1 -d -p 8500:8500 -p 8300:8300 -p 8301:8301 -p 8302:8302 -p 8600:8600 consul:latest agent -server -bootstrap-expect 2 -ui -bind=0.0.0.0 -client=0.0.0.0
8500 http 端口,用于 http 接口和 web ui
8300 server rpc 端口,同一数据中心 consul server 之间通过该端口通信
8301 serf lan 端口,同一数据中心 consul client 通过该端口通信
8302 serf wan 端口,不同数据中心 consul server 通过该端口通信
8600 dns 端口,用于服务发现
-bbostrap-expect 2: 集群至少两台服务器,才能选举集群leader
-ui:运行 web 控制台
-bind: 监听网口,0.0.0.0 表示所有网口,如果不指定默认未127.0.0.1,则无法和容器通信
-client : 限制某些网口可以访问
docker run --name consul2 -d -p 8501:8500 consul agent -server -ui -bind=0.0.0.0 -client=0.0.0.0 -join 172.17.0.2
docker run --name consul2 -d -p 8502:8500 consul agent -server -ui -bind=0.0.0.0 -client=0.0.0.0 -join 172.17.0.2

consul_server.go

package main

import (
"fmt"
"log"
"net"
"net/http"
_ "net/http/pprof"

consulapi "github.com/hashicorp/consul/api"
)

var count int64

// consul 服务端会自己发送请求,来进行健康检查
func consulCheck(w http.ResponseWriter, r *http.Request) {

s := "consulCheck" + fmt.Sprint(count) + "remote:" + r.RemoteAddr + " " + r.URL.String()
fmt.Println(s)
fmt.Fprintln(w, s)
count++
}

func registerServer() {

config := consulapi.DefaultConfig()
config.Address = "10.0.0.10:8500"
client, err := consulapi.NewClient(config)
if err != nil {
log.Fatal("consul client error : ", err)
}

registration := new(consulapi.AgentServiceRegistration)
registration.ID = "serverNode_1" // 服务节点的名称
registration.Name = "serverNode" // 服务名称
registration.Port = 9527 // 服务端口
registration.Tags = []string{"v1000"} // tag,可以为空
registration.Address = localIP() // 服务 IP

checkPort := 8080
registration.Check = &consulapi.AgentServiceCheck{ // 健康检查
HTTP: fmt.Sprintf("http://%s:%d%s", registration.Address, checkPort, "/check"),
Timeout: "3s",
Interval: "5s", // 健康检查间隔
DeregisterCriticalServiceAfter: "30s", //check失败后30秒删除本服务,注销时间,相当于过期时间
// GRPC: fmt.Sprintf("%v:%v/%v", IP, r.Port, r.Service),// grpc 支持,执行健康检查的地址,service 会传到 Health.Check 函数中
}

err = client.Agent().ServiceRegister(registration)
if err != nil {
log.Fatal("register server error : ", err)
}

http.HandleFunc("/check", consulCheck)
http.ListenAndServe(fmt.Sprintf(":%d", checkPort), nil)

}

func localIP() string {
addrs, err := net.InterfaceAddrs()
if err != nil {
return ""
}
for _, address := range addrs {
if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
if ipnet.IP.To4() != nil {
return ipnet.IP.String()
}
}
}
return ""
}

func main() {
registerServer()
}

consul_client.go

package main

import (
"fmt"
"net"
"strconv"

"github.com/Sirupsen/logrus"
"github.com/hashicorp/consul/api"
)

func main() {
var lastIndex uint64
config := api.DefaultConfig()
config.Address = "10.0.0.10:8500" //consul server

client, err := api.NewClient(config)
if err != nil {
fmt.Println("api new client is failed, err:", err)
return
}
services, metainfo, err := client.Health().Service("serverNode", "v1000", true, &api.QueryOptions{
WaitIndex: lastIndex, // 同步点,这个调用将一直阻塞,直到有新的更新
})
if err != nil {
logrus.Warn("error retrieving instances from Consul: %v", err)
}
lastIndex = metainfo.LastIndex

addrs := map[string]struct{}{}
for _, service := range services {
fmt.Println("service.Service.Address:", service.Service.Address, "service.Service.Port:", service.Service.Port)
addrs[net.JoinHostPort(service.Service.Address, strconv.Itoa(service.Service.Port))] = struct{}{}
}
}
go run ./consul_server.go

在浏览器中输入http://localhost:8500/ui/dc1/services 即可看到注册

image

执行 go run consul_client.go 即可获取到 server 注册的 IP和地址